fuocore.player 源代码

# -*- coding: utf-8 -*-

"""
fuocore.player
--------------

This module difines :class:`.AbstractPlayer` ,and implement a :class:`.MpvPlayer`
based on mpv. In addition, it defines :class:`.Playlist` class, which is used
by the player to manage playlist.

**Simple Usage**

.. sourcecode:: python

    >>> from fuocore.player import MpvPlayer
    >>> player = MpvPlayer()
    >>> player.initialize()
    >>> player.play('xxx')  # local file path or http url
 """

from abc import ABCMeta, abstractmethod
from enum import IntEnum
import locale
import logging
import random

from mpv import MPV, MpvEventID, MpvEventEndFile, \
        _mpv_set_property_string

from fuocore.dispatch import Signal


logger = logging.getLogger(__name__)


[文档]class State(IntEnum): """ Player states. """ stopped = 0 paused = 1 playing = 2
[文档]class PlaybackMode(IntEnum): """ Playlist playback mode. """ one_loop = 0 #: One Loop sequential = 1 #: Sequential loop = 2 #: Loop random = 3 #: Random
[文档]class Playlist(object): """player playlist provide a list of song model to play NOTE - Design: Why we use song model instead of url? Theoretically, using song model may increase the coupling. However, simple url do not obtain enough metadata. """ def __init__(self, songs=None, playback_mode=PlaybackMode.loop): """ :param songs: list of :class:`fuocore.models.SongModel` :param playback_mode: :class:`fuocore.player.PlaybackMode` """ #: store value for ``current_song`` property self._current_song = None #: songs whose url is invalid self._bad_songs = [] #: store value for ``songs`` property self._songs = songs or [] #: store value for ``playback_mode`` property self._playback_mode = playback_mode #: playback mode changed signal self.playback_mode_changed = Signal() #: current song changed signal self.song_changed = Signal() def __len__(self): return len(self._songs) def __getitem__(self, index): """overload [] operator""" return self._songs[index] def mark_as_bad(self, song): if song in self._songs and song not in self._bad_songs: self._bad_songs.append(song)
[文档] def add(self, song): """往播放列表末尾添加一首歌曲""" if song in self._songs: return self._songs.append(song) logger.debug('Add %s to player playlist', song)
[文档] def insert(self, song): """在当前歌曲后插入一首歌曲""" if song in self._songs: return if self._current_song is None: self._songs.append(song) else: index = self._songs.index(self._current_song) self._songs.insert(index + 1, song)
[文档] def remove(self, song): """Remove song from playlist. O(n) If song is current song, remove the song and play next. Otherwise, just remove it. """ if song in self._songs: if self._current_song is None: self._songs.remove(song) elif song == self._current_song: next_song = self.next_song # 随机模式下或者歌单只剩一首歌曲,下一首可能和当前歌曲相同 if next_song == self.current_song: self.current_song = None self._songs.remove(song) self.current_song = self.next_song else: self.current_song = self.next_song self._songs.remove(song) else: self._songs.remove(song) logger.debug('Remove {} from player playlist'.format(song)) else: logger.debug('Remove failed: {} not in playlist'.format(song)) if song in self._bad_songs: self._bad_songs.remove(song)
[文档] def clear(self): """remove all songs from playlists""" self.current_song = None self._songs = [] self._bad_songs.clear()
[文档] def list(self): """get all songs in playlists""" return self._songs
@property def current_song(self): """ current playing song, return None if there is no current song """ return self._current_song @current_song.setter def current_song(self, song): """设置当前歌曲,将歌曲加入到播放列表,并发出 song_changed 信号 .. note:: 该方法理论上只应该被 Player 对象调用。 """ self._last_song = self.current_song if song is None: self._current_song = None # add it to playlist if song not in playlist elif song in self._songs: self._current_song = song else: self.insert(song) self._current_song = song self.song_changed.emit(song) @property def playback_mode(self): return self._playback_mode @playback_mode.setter def playback_mode(self, playback_mode): self._playback_mode = playback_mode self.playback_mode_changed.emit(playback_mode) def _get_good_song(self, base=0, random_=False, direction=1): """从播放列表中获取一首可以播放的歌曲 :param base: base index :param random: random strategy or not :param direction: forward if > 0 else backword >>> pl = Playlist([1, 2, 3]) >>> pl._get_good_song() 1 >>> pl._get_good_song(base=1) 2 >>> pl._bad_songs = [2] >>> pl._get_good_song(base=1, direction=-1) 1 >>> pl._get_good_song(base=1) 3 >>> pl._bad_songs = [1, 2, 3] >>> pl._get_good_song() """ if not self._songs or len(self._songs) <= len(self._bad_songs): logger.debug('No good song in playlist.') return None good_songs = [] if direction > 0: song_list = self._songs[base:] + self._songs[0:base] else: song_list = self._songs[base::-1] + self._songs[:base:-1] for song in song_list: if song not in self._bad_songs: good_songs.append(song) if random_: return random.choice(good_songs) else: return good_songs[0] @property def next_song(self): """next song for player, calculated based on playback_mode""" # 如果没有正在播放的歌曲,找列表里面第一首能播放的 if self.current_song is None: return self._get_good_song() if self.playback_mode == PlaybackMode.random: next_song = self._get_good_song(random_=True) else: current_index = self._songs.index(self.current_song) if current_index == len(self._songs) - 1: if self.playback_mode in (PlaybackMode.loop, PlaybackMode.one_loop): next_song = self._get_good_song() elif self.playback_mode == PlaybackMode.sequential: next_song = None else: next_song = self._get_good_song(base=current_index+1) return next_song @property def previous_song(self): """previous song for player to play NOTE: not the last played song """ if self.current_song is None: return self._get_good_song(base=-1, direction=-1) if self.playback_mode == PlaybackMode.random: previous_song = self._get_good_song(direction=-1) else: current_index = self._songs.index(self.current_song) previous_song = self._get_good_song(base=current_index - 1, direction=-1) return previous_song
class AbstractPlayer(metaclass=ABCMeta): """Player abstrace base class""" def __init__(self, playlist=Playlist(), **kwargs): self._position = 0 # seconds self._volume = 100 # (0, 100) self._playlist = playlist self._state = State.stopped self._duration = None #: player position changed signal self.position_changed = Signal() #: player state changed signal self.state_changed = Signal() #: current song finished signal self.song_finished = Signal() #: duration changed signal self.duration_changed = Signal() #: media change signal self.media_changed = Signal() @property def state(self): """Player state :return: :class:`.State` """ return self._state @state.setter def state(self, value): """set player state, emit state changed signal outer object should not set state directly, use ``pause`` / ``resume`` / ``stop`` / ``play`` method instead. """ self._state = value self.state_changed.emit(value) @property def current_song(self): """当前正在播放的歌曲 .. warning: 即使 """ return self._playlist.current_song @property def playlist(self): """player playlist :return: :class:`.Playlist` """ return self._playlist @property def position(self): """player position, the units is seconds""" return self._position @position.setter def position(self, position): """set player position, the units is seconds""" @property def volume(self): return self._volume @volume.setter def volume(self, value): value = 0 if value < 0 else value value = 100 if value > 100 else value self._volume = value @property def duration(self): """player media duration, the units is seconds""" return self._duration @duration.setter def duration(self, value): if value is not None and value != self._duration: self._duration = value self.duration_changed.emit(value) @abstractmethod def play(self, url): """play media :param url: a local file absolute path, or a http url that refers to a media file """ @abstractmethod def play_song(self, song): """play media by song model :param song: :class:`fuocore.models.SongModel` """ @abstractmethod def resume(self): """play playback""" @abstractmethod def pause(self): """pause player""" @abstractmethod def toggle(self): """toggle player state""" @abstractmethod def stop(self): """stop player""" @abstractmethod def initialize(self): """"initialize player""" @abstractmethod def shutdown(self): """shutdown player, do some clean up here""" class MpvPlayer(AbstractPlayer): """ player will always play playlist current song. player will listening to playlist ``song_changed`` signal and change the current playback. TODO: make me singleton """ def __init__(self, audio_device=b'auto', *args, **kwargs): super(MpvPlayer, self).__init__(*args, **kwargs) # https://github.com/cosven/FeelUOwn/issues/246 locale.setlocale(locale.LC_NUMERIC, 'C') self._mpv = MPV(ytdl=False, input_default_bindings=True, input_vo_keyboard=True) _mpv_set_property_string(self._mpv.handle, b'audio-device', audio_device) def initialize(self): self._mpv.observe_property( 'time-pos', lambda name, position: self._on_position_changed(position) ) self._mpv.observe_property( 'duration', lambda name, duration: self._on_duration_changed(duration) ) # self._mpv.register_event_callback(lambda event: self._on_event(event)) self._mpv.event_callbacks.append(self._on_event) self._playlist.song_changed.connect(self._on_song_changed) self.song_finished.connect(self._on_song_finished) logger.info('Player initialize finished.') def shutdown(self): del self._mpv def play(self, url): # NOTE - API DESGIN: we should return None, see # QMediaPlayer API reference for more details. logger.debug("Player will play: '%s'", url) # Clear playlist before play next song, # otherwise, mpv will seek to the last position and play. self._mpv.playlist_clear() self._mpv.play(url) self._mpv.pause = False self.state = State.playing self.media_changed.emit(url) def play_song(self, song): """播放指定歌曲 如果目标歌曲与当前歌曲不相同,则修改播放列表当前歌曲, 播放列表会发出 song_changed 信号,player 监听到信号后调用 play 方法, 到那时才会真正的播放新的歌曲。如果和当前播放歌曲相同,则忽略。 .. note:: 调用方不应该直接调用 playlist.current_song = song 来切换歌曲 """ if song is not None and song == self.current_song: logger.warning('The song is already under playing.') else: self._playlist.current_song = song def play_next(self): """播放下一首歌曲 .. note:: 这里不能使用 ``play_song(playlist.next_song)`` 方法来切换歌曲, ``play_song`` 和 ``playlist.current_song = song`` 是有区别的。 """ self.playlist.current_song = self.playlist.next_song def play_previous(self): self.playlist.current_song = self.playlist.previous_song def replay(self): self.playlist.current_song = self.current_song def resume(self): self._mpv.pause = False self.state = State.playing def pause(self): self._mpv.pause = True self.state = State.paused def toggle(self): self._mpv.pause = not self._mpv.pause if self._mpv.pause: self.state = State.paused else: self.state = State.playing def stop(self): logger.info('stop player...') self._mpv.pause = True self.state = State.stopped self._mpv.playlist_clear() @property def position(self): return self._position @position.setter def position(self, position): self._mpv.seek(position, reference='absolute') self._position = position @AbstractPlayer.volume.setter def volume(self, value): super(MpvPlayer, MpvPlayer).volume.__set__(self, value) self._mpv.volume = self.volume def _on_position_changed(self, position): self._position = position self.position_changed.emit(position) def _on_duration_changed(self, duration): """listening to mpv duration change event""" logger.info('Player receive duration changed signal') self.duration = duration def _on_song_changed(self, song): """播放列表 current_song 发生变化后的回调 判断变化后的歌曲是否有效的,有效则播放,否则将它标记为无效歌曲。 如果变化后的歌曲是 None,则停止播放。 """ logger.debug('Player received song changed signal') if song is not None: logger.info('Try to play song: %s' % song) if song.url: self.play(song.url) else: self._playlist.mark_as_bad(song) self.play_next() else: self.stop() logger.info('No good song in player playlist anymore.') def _on_event(self, event): if event['event_id'] == MpvEventID.END_FILE: reason = event['event']['reason'] logger.debug('Current song finished. reason: %d' % reason) if self.state != State.stopped and reason != MpvEventEndFile.ABORTED: self.song_finished.emit() def _on_song_finished(self): if self._playlist.playback_mode == PlaybackMode.one_loop: self.replay() else: self.play_next()